package cluster

import (
	"context"
	"os"
	"strconv"
	"time"

	"gitee.com/simonxie979/skymeta/logutil"
	"gitee.com/simonxie979/skymeta/network"
	"gitee.com/simonxie979/skymeta/protocol"

	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
)

const (
	LockKey    = "/NodeLaunchMutex" // 节点启动锁
	NodePrefix = "/AllNode/"        // 节点前缀
)

type Event_OnConnect func(sessionID uint64, addr string)
type Event_OnDisconnect func(sessionID uint64, err error)
type Event_OnMessage func(sessionID uint64, msg *protocol.SSMessage)

type NodeConfig struct {
	NodeID     uint16 // 节点ID
	ListenAddr string // 监听地址 用于监听本地端口
	RemoteAddr string // 远端地址 用于
}

type Cluster struct {
	ctx context.Context

	nodeID     uint16
	listenAddr string
	remoteAddr string

	etcd *clientv3.Client                           // 节点注册发现模块
	comm *network.Communicator[*protocol.SSMessage] // 通信器
	log  *logutil.Logger                            // 日志对象

	event_OnConnect    Event_OnConnect    // 新连接到达
	event_OnDisconnect Event_OnDisconnect // 连接断开
	event_OnMessage    Event_OnMessage    // 新消息到达
}

// NewCluster Launch a not exist node in cluster.
func NewCluster(ctx context.Context, logger *logutil.Logger, nodeID uint16) *Cluster {
	registryAddr := os.Getenv("etcd_addr")
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{registryAddr},
		DialTimeout: time.Second * 5,
	})
	if err != nil {
		logger.Panicf("Cluster", "dial to etcd server failure. %v", err)
	}

	c := new(Cluster)
	c.ctx = ctx
	c.nodeID = nodeID
	c.listenAddr = os.Getenv("listen_addr")
	c.remoteAddr = os.Getenv("remote_addr")

	c.etcd = cli
	c.comm = network.NewCommunicator[*protocol.SSMessage](ctx, c, logger)
	c.log = logger

	c.event_OnConnect = nil
	c.event_OnDisconnect = nil
	c.event_OnMessage = nil

	return c
}

func (c *Cluster) nodeKey() string {
	return NodePrefix + strconv.FormatUint(uint64(c.nodeID), 10)
}

// listen Listening the cluster node port.
func (c *Cluster) listen() {
	if err := c.comm.ListenTCP("tcp", c.listenAddr); err != nil {
		c.log.Panicf("Cluster", "listen cluster node address port failure. err: %v", err)
	}
}

// lease Create a lease of node key, and keep alive it.
func (c *Cluster) lease() clientv3.LeaseID {
	lease := clientv3.NewLease(c.etcd)
	grantRes, err := lease.Grant(c.ctx, 3)
	if err != nil {
		c.log.Panicf("Cluster", "lease grant failure. err: %v", err)
	}

	kaChan, err := lease.KeepAlive(c.ctx, grantRes.ID)
	if err != nil {
		c.log.Panicf("Cluster", "%s keep alive failure. error: %v", err)
	}

	go func() {
		defer func() {
			// release the lease
			lease.Close()
		}()
		for {
			select {
			case <-c.ctx.Done():
				return
			case _, ok := <-kaChan:
				if !ok {
					return
				}
			}
		}
	}()

	return grantRes.ID
}

// registLock Create a distributed mutex and lock it,
// return a unlock function.
func (c *Cluster) registLock() func() {
	session, err := concurrency.NewSession(c.etcd)
	if err != nil {
		c.log.Panicf("Cluster", "new concurrency session failure. err: %v", err)
	}

	mutex := concurrency.NewMutex(session, LockKey)
	if err := mutex.Lock(c.ctx); err != nil {
		c.log.Panicf("Cluster", "etcd mutex lock failure. err: %v", err)
	}

	unlock := func() {
		mutex.Unlock(c.ctx)
		session.Close()
	}

	return unlock
}

// regist Regist a cluster node based on node id,
// if the node is already registed, will panic.
func (c *Cluster) regist() bool {
	nodeKey := c.nodeKey()
	nodeValue := c.remoteAddr

	// create a transaction
	// if the key does not already exist, will create it
	// otherwise, will report a error
	txn := clientv3.NewKV(c.etcd).Txn(c.ctx)
	txn.If(clientv3.Compare(clientv3.CreateRevision(nodeKey), "=", 0)).
		Then(clientv3.OpPut(nodeKey, nodeValue, clientv3.WithLease(c.lease()))).
		Else(clientv3.OpGet(nodeKey))

	txnRes, err := txn.Commit()
	if err != nil {
		c.log.Panicf("Cluster", "execute transaction failure. err: %v", err)
	}

	if !txnRes.Succeeded {
		c.log.Errorf("Cluster", "%s registry failure. res: %v", nodeKey, txnRes)
		return false
	}

	return true
}

// listNode List alreadt exist cluster node.
func (c *Cluster) listNode() (list []string) {
	resp, err := c.etcd.Get(c.ctx, NodePrefix, clientv3.WithPrefix())
	if err != nil {
		c.log.Panicf("Cluster", "get already exist cluster node failure. err: %v", err)
	}

	for _, kv := range resp.Kvs {
		list = append(list, string(kv.Value))
	}

	return
}

// connect Connect to already exist cluster node.
func (c *Cluster) connect(list []string) {
	for _, addr := range list {
		sessionID, err := c.comm.EstablishTCP("tcp", addr)
		if err != nil {
			c.log.Errorf("Cluster", "connect to %s failure. err: %v", addr, err)
			continue
		}

		c.OnConnect(sessionID, addr)
	}
}

// Init Launch a not exist node in cluster.
// Initalization cluster node, include listen port, registry and connect,
// if either operation fails, will panic.
func (c *Cluster) Init(c1 Event_OnConnect, c2 Event_OnDisconnect, c3 Event_OnMessage) {
	c.event_OnConnect = c1
	c.event_OnDisconnect = c2
	c.event_OnMessage = c3

	c.listen()

	unlock := c.registLock()
	defer unlock()

	list := c.listNode()

	regStatus := false
	for i := 0; i < 3; i++ {
		regStatus = c.regist()
		if regStatus {
			break
		}
		c.log.Warnf("Cluster", "An attempt will be made to register the cluster node again after 1 second")
		time.Sleep(time.Second * 1)
	}

	if !regStatus {
		c.log.Panicf("Cluster", "cluster node register failure")
	}

	c.connect(list)
}

// Close Shutdown the communicator and close all of already exist sessions.
func (c *Cluster) Close() {
	c.comm.Close()
}

// OnConnect Recive a new connect form listener/dialer
// If underlying support it, then convert it to NodeRemote and cache it
func (c *Cluster) OnConnect(sessionID uint64, addr string) {
	c.log.Warnf("Cluster", "remote %X connect with %v", sessionID, addr)
	if c.event_OnConnect != nil {
		c.event_OnConnect(sessionID, addr)
	}
}

// OnDisconnect The session disconnected, the remote device initiates or actively disconnects
func (c *Cluster) OnDisconnect(sessionID uint64, err error) {
	c.log.Warnf("Cluster", "remote %X disconnect. err: %v", sessionID, err)
	if c.event_OnDisconnect != nil {
		c.event_OnDisconnect(sessionID, err)
	}
}

// OnMessage Recive a new message. Just recive the message from session.
// And push it to InPipe.
func (c *Cluster) OnMessage(sessionID uint64, msg *protocol.SSMessage) {
	c.log.Debugf("Cluster", "message from %X --> %v", sessionID, msg)
	if c.event_OnMessage != nil {
		c.event_OnMessage(sessionID, msg)
	}
}

// SendMsg Send message to specified node by session id.
func (c *Cluster) SendMsg(sessionID uint64, data *protocol.SSMessage) {
	c.comm.SendMsg(sessionID, data)
}

// BordcastMsg Send message to all of node.
func (c *Cluster) BordcastMsg(data *protocol.SSMessage) {
	c.comm.BordcastMsg(data)
}
